一. Spark 与 MapReduce 区别
Apache Spark™ is a fast and general engine for large-scale data processing.
与mapreduce比较 :
Spark大多数执行过程是基于内存的迭代
MapReduce 的 优点, SparkCore 都有
Hive 能做的操作, SparkSQL 都能做, 可以写 SQL 语句转换为 SparkCore 代码
Spark Streaming 提供近实时流
超过80个类似于 map, reduce 这样的操作
可以在Tachyon(基于内存的分布式的文件系统 (HDFS 是基于磁盘)) 上运行Spark, 会更快
二. 什么是RDD
1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
- 如果读取的文件本来就存在于3个分区, 这些操作会并行操作, 如何并行操作? :TODO
- 如果存在于3个分区, 手动规定了2个分区, 那么是如何工作的 ? :TODO
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。
- 从文件系统读取: local 或 HDFS
sc.textFile("/Users/shixuanji/Documents/a.txt",2);
- Hive 表: : TODO
- 并行化的方式创建(多用于测试):
val rdd = sc.makeRDD(1 to 10)
或者val rdd = sc.parallelize(arr);
4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性 == 灵活)
下图中画橙色框的都是 RDD
源码中的注释说明
1、A list of partitions:一组分片(Partition),即数据集的基本组成单位
2、A function for computing each split:一个计算每个分区的函数
3、A list of dependencies on other RDDs:RDD 之间的依赖关系
- NarrowDependency 完全依赖, 窄依赖
- ShuffleDependency 部分依赖, ‘’宽依赖’’
4、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
- Partitioner: 自定义分区使用
5、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):一个列表,存储存取每个 Partition 的优先位置(preferred location)。
Spark 中其它重要概念
Cluster Manager:Spark 的集群管理器,主要负责资源的分配与管理。集群管理器分配的资 源属于一级分配,它将各个 Worker 上的内存、CPU 等资源分配给应用程序,但是并不负责 对 Executor 的资源分配。目前,Standalone、YARN、Mesos、K8S,EC2 等都可以作为 Spark 的集群管理器。
Master:Spark 集群的主节点。
Worker:Spark 集群的工作节点。对 Spark 应用程序来说,由集群管理器分配得到资源的 Worker 节点主要负责以下工作:创建 Executor,将资源和任务进一步分配给 Executor,同步 资源信息给 Cluster Manager。
Executor:执行计算任务的一些进程。主要负责任务的执行以及与 Worker、Driver Application 的信息同步。
Driver Appication:客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换 为 RDD 和 DAG,并与 Cluster Manager 进行通信与调度。
关系:
1、用户使用 SparkContext 提供的 API(常用的有 textFile、sequenceFile、runJob、stop 等) 编写 Driver Application 程序。此外 SQLContext、HiveContext 及 StreamingContext 对 SparkContext 进行封装,并提供了 SQL、Hive 及流式计算相关的 API。
2、使用 SparkContext 提交的用户应用程序,首先会使用 BlockManager 和 BroadcastManager 将任务的 Hadoop 配置进行广播。然后由 DAGScheduler 将任务转换为 RDD 并组织成 DAG, DAG 还将被划分为不同的 Stage。最后由 TaskScheduler 借助 ActorSystem 将任务提交给集群 管理器(Cluster Manager)。
3、集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到 Worker 上,Worker 创建 Executor 来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为 Spark 的集 群管理器。
注意: 如果是 –deploy-mode client 模式, client 就是 Driver, –deploy-mode cluster 模式, Driver 是由集群分配的一台 worker节点
三. Spark 的架构(standalone)
涉及到的名词: Driver, Master, Worker, Executor , Task
四. Spark 任务提交
参考官网 http://spark.apache.org/docs/latest/submitting-applications.html
Client模式
不指定deploy-mode
,默认就是client模式,也就是哪一台服务器提交spark代码,那么哪一台就是driver服务器。
Cluster模式
需要指定deploy-mode
,driver服务器并不是提交代码的那一台服务器,而是在提交代码的时候,在worker主机上,随机挑选一台作为driver服务器,那么如果提交10个应用,那么就有可能10台driver服务器。
–master spark://xxxxxx
需要启动 spark 集群
–master yarn
不需要启动 spark 集群, 提交的程序由 yarn 管理
1 | # Run application locally on 8 cores |
五. Transformation和action原理
Spark支持两种RDD操作:transformation
和action
。
transformation
操作会针对已有的RDD创建一个新的RDD;- 而
action
则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。
例如,map就是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。
transformation的特点就是lazy特性。lazy特性指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。
action操作执行,会触发一个spark job
的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。
六. Transformation 和 Action 算子
1.Transformation 算子
或 ../
map:
对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter:
对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap:
和map差不多,但是flatMap生成的是多个结果
groupByKey, reduceByKey, sortByKey :
凡是这种.. ByKey 的, 必须传入一个对偶元祖, Java中是 JavaPairRdd
cogroup 与 join 与 union 区别:
- cogroup
- 相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
- return: JavaPairRDD[K, (JIterable[V], JIterable[W])
- RDD的value是一个Pair的实例,这个实例包含两个Iterable的值,
V
表示的是RDD1中相同KEY的值,W
表示的是RDD2中相同key的值.
- join
- 相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
- leftOuterJoin / rigthOuterJoin
- left: 左边的一定显示, 右边的 Join 上才显示
- right: 右边的一定显示, 左边的 join 上才显示
- union
- 求rdd并集,但是不去重
Intersection,Distinct,Cartesian
- intersection
- intersection 求交集,提取两个rdd中都含有的元素。
- Returns a new RDD that contains the intersection of elements in the source dataset and the argument.
- Distinct (独特的,有区别的)
- 去重
- Return a new RDD containing the distinct elements in this RDD.
- Cartesian (笛卡尔积)
- 笛卡尔积, 全连接, 前后集合个数为a,b, a x b 种组合
mapPartition,reparation,coalesce
mapPartition
- 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了
RDD
中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多。 - 比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个
connection
,这样开销很大,如果使用mapPartitions
,那么只需要针对每一个分区建立一个connection
。
- 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了
coalesce
- coalesce: 只能用于减少分区的数量,而且可以选择不发生shuffle 其实说白了他就是合并分区
- repartition:可以增加分区,也可以减少分区,必须会发生shuffle,相当于就是进行重新分区
reparation
- reparition是
coalesce shuffle
为true
的简易实现
- reparition是
sample 和 aggregateByKey
sample
对RDD中的集合内元素进行采样,第一个参数withReplacement是true表示有放回取样,false表示无放回。第二个参数表示比例
1
2
3
4
5
6
7
8
9/**
*- @param withReplacement can elements be sampled multiple times (replaced when sampled out)
@param fraction expected size of the sample as a fraction of this RDD's size
seed 最好不要动
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
aggregateByKey
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
//按照key进行聚合- 其实
reduceBykey
就是aggregateByKey
的简化版。 aggregateByKey
多提供了一个函数 seqOp 类似于Mapreduce的combine操作(就在map端执行reduce的操作)
mapPartitionsWithIndex 和 repartitionAndSortWithinPartitions
- mapPartitionsWithIndex
- 说白了就是可以打印出当前所在分区数
- repartitionAndSortWithinPartitions
- 该方法依据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序;通过对比sortByKey发现,这种方式比先分区,然后在每个分区中进行排序效率高,这是因为它可以将排序融入到shuffle阶段。
2. Action 算子
reduce();
- def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
collect();
- Return an array that contains all of the elements in this RDD.
- this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
take(n);
- Take the first num elements of the RDD. This currently scans the partitions one by one, so it will be slow if a lot of partitions are required. In that case, use collect() to get the whole RDD instead.
- this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
count();
- Return the number of elements in the RDD.
takeOrdered();
- Returns the first k (smallest) elements from this RDD using the natural ordering for T while maintain the order.
- top(n): 自然排序后,最大的前 n
saveAsTextFile();
- Save this RDD as a text file, using string representations of elements.
countByKey();
- return map<String, Integer>, key 为 key, value 为 key 的数量
takeSample();
- withReplacement:元素可以多次(重复)抽样(在抽样时替换) 如果为 false, 在抽样数 > 样本数时, 只能返回样本数的样本
- num:返回的样本的大小
- seed:随机数生成器的种子
1 | def takeSample( |
七. RDD 的持久化
将数据通过操作持久化(或缓存)在内存中是Spark的重要能力之一。当你缓存了一个RDD,每个节点都缓存了RDD的所有分区。这样就可以在内存中进行计算。这样可以使以后在RDD上的动作更快(通常可以提高10倍)。
你可以对希望缓存的RDD通过使用persist或cache方法进行标记。它通过动作操作第一次在RDD上进行计算后,它就会被缓存在节点上的内存中。Spark的缓存具有容错性,如果RDD的某一分区丢失,它会自动使用最初创建RDD时的转换操作进行重新计算。
另外,RDD可以被持久化成不同的级别。比如,可以允许你存储在磁盘,内存,甚至是序列化的Java对象(节省空间),备份在不同的节点上,或者存储在基于内存的文件系统Tachyon上。通过向persist()方法传递StorageLevel对象来设置。cache方法是使用默认级别StorageLevel.MEMORY_ONLY
的方法。
选持久化方案建议:
- 优先选择MEMORY_ONLY,如果可以用内存缓存所有的数据,那么也就意味着我的计算是纯内存的计算,速度当然快。
- 如果MEMORY_ONLY 缓存不了所有的数据,MEMORY_ONLY_SER 把数据实现序列化然后进行存储。这样也是纯内存操作,速度也快,只不过需要耗费一点cpu资源需要反序列化。
- 也可以选用带_2这种方式, 此方式会存2份, 一份存在本地, 另一份会存到另外的节点。恢复速度的时候可以使用备份。
- 能不能使用DISK的,就不使用DISK,有时候从磁盘读,还不如从新计算一次。
关于tachyon
Spark2.0开始就不把tachyon(现在成为alluxio)集成在自身内部了, 依然可以直接用
基于内存的分布式文件系统
出现原因:
- spark运行以 JVM为基础,所以spark的任务会把数据存入JVM的堆中,随着计算的迭代,JVM堆中存放的数据量迅速增大,对于spark而言,spark的计算引擎和存储引擎处在同一个JVM中,所以会有重复的GC方面的开销。这样就增大了系统的延时。
- 当JVM崩溃时,缓存在JVM堆中的数据也会消失,这个时候spark不得不根据RDD的血缘关系重新计算数据。
- 如果spark需要其他的框架的共享数据,比如就是hadoop的Mapreduce,这个时候就必须通过第三方来共享,比如借助HDFS,那么这样的话,就需要额外的开销,借助的是HDFS,那么就需要磁盘IO的开销。
- 因为我们基于内存的分布式计算框架有以上的问题,那么就促使了内存分布式文件系统的诞生,比如tachyon。
Tachyon可以解决spark的什么问题呢?
如果我们把数据存放到tachyon上面:
- 减少Spark GC的开销。
- 当spark 的JVM崩溃的时候,存放在tachyon上的数据不受影响。
- spark如果要想跟被的计算工具共享数据,只要通过tachyon的Client就可以做到了。并且延迟远低于HDFS等系统。
八. 广播变量 和 累加器
1. 广播变量
案例> 广播 ip 规则, 匹配 ip 十进制地址, 存入 mysql, 完整过程图示
每个 executor 拥有一份, 这个 executor 启动的 task 会共享这个变量
使用了广播变量之后, executor 中所有的 task 都会共享此变量, 否则每个 task 都会发一份
在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
使用:
1 | // 定义 |
2. 累加器
使用场景: 异常监控,调试,记录符合某特性的数据的数目等
如果一个变量不被声明为一个累加器,那么它将在 被改变时不会在 driver 端进行全局汇总,即在分布式运行时每个 task 运行的只是原始变量的一个副本,并不能改变原始变量的值
但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。
累加器在 Driver 端定义赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更 新。
使用:
1 | // 定义 |
九. Spark on Yarn 模式
配置: 只需要在conf/spark-env.sh
中配置export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
就可以了
使用YARN模式的时候,不需要启动master和worker了。
只需要启动HDFS和YARN即可。
与 Standalone
主要区别是: spark-submit
后面的参数中--master
后面的不是 spark://…., 而是 yarn
, 这样:--master yarn
Spark on Yarn 模式注意点:
注意:如果你配置spark-on-yarn的client
模式,其实会报错。
内部有一个内存检测机制
修改所有yarn
节点的yarn-site.xml
,在该文件中添加如下配置
1 | <property> |
–deploy-mode client
–deploy-mode cluster
十. 宽窄依赖
窄依赖是指父RDD的每个分区都只被子RDD一个分区使用。(独生, NarrowDependency)
宽依赖就是指父RDD的分区被多个子RDD的分区所依赖。 (超生, ShuffleDependency)
十一. Stage 划分
开发完一个应用以后,把应用提交到集群,那么这个应用就叫做Application
这个应用里面我们开发了好多代码,这些代码里面凡是遇到一个action操作,就会产生一个job任务。
也就意味着,一个Application有一个或者一个以上的job任务。
然后这些job任务划分为不同stage去执行,stage里面就是运行不同的task任务。
遇到一个 shuffle 算子, 就会从中间分开, 划分为2个 stage
Task计算的就是分区上面的数据。
十二. Spark 任务调度
Shuffle 机制见下一篇
1.简版
2.完整版
注意:
- AppClient / clientActor:在 Standalone 模式下的实现是
StandaloneAppClient
类 - dirverActor: :TODO
- 第7点,
ApplicationDescription
参数中封装的是一些系统信息, 和 用户设置的 cpu 的核数, 执行的线程数之类的数据 - worker 端是在全图的 第 19步(action 算子触发后, 会提交 job )才知道, 去哪里计算数据.
十三. TopN 案例
对一个文件里面的单词进行单词计数,然后取前3个出现次数最多的三个单词。
1 | object TopN { |
十四. 网站访问日志分析
需求分析
需求一 :
The average, min, and max content size of responses returned from the server.
需求二:
A count of response code’s returned.
需求三:
All IPAddresses that have accessed this server more than N times.
需求四:
The top endpoints requested by count. TopN 找出被访问次数最多的地址的前三个
Data
1 | log.txt |
Code
1 | class ApacheAccesslog |
1 | class LogAnalyer |
十五. 线程池
3种线程池的构造方式见代码
1 | import java.util.concurrent.Executor; |